From 14a5b54eb0bf59d0914b6b2c78d4ea1b596e6329 Mon Sep 17 00:00:00 2001 From: "mjw@wray-m-3.hpl.hp.com" Date: Thu, 21 Apr 2005 13:25:07 +0000 Subject: [PATCH] bitkeeper revision 1.1327.2.2 (4267a9b3MhPpljnjQ5IbfLdzcW2K3w) Remove twisted from the HTTP server and replace with a threaded server. Add classes to provide tcp and unix servers using threads instead of twisted. Remove use of twisted from the consoles, event server and HTTP resources Signed-off-by: Mike Wray --- .rootkeys | 15 +- tools/python/setup.py | 1 + tools/python/xen/web/SrvBase.py | 160 ++++++ tools/python/xen/web/SrvDir.py | 115 +++++ tools/python/xen/web/__init__.py | 1 + tools/python/xen/web/connection.py | 387 ++++++++++++++ tools/python/xen/web/defer.py | 3 + tools/python/xen/web/http.py | 516 +++++++++++++++++++ tools/python/xen/web/httpserver.py | 144 ++++++ tools/python/xen/web/protocol.py | 94 ++++ tools/python/xen/web/reactor.py | 9 + tools/python/xen/web/resource.py | 91 ++++ tools/python/xen/web/static.py | 46 ++ tools/python/xen/web/tcp.py | 90 ++++ tools/python/xen/web/unix.py | 76 +++ tools/python/xen/xend/EventServer.py | 99 ++-- tools/python/xen/xend/XendDomain.py | 25 +- tools/python/xen/xend/XendDomainInfo.py | 84 +-- tools/python/xen/xend/scheduler.py | 24 +- tools/python/xen/xend/server/SrvBase.py | 185 +------ tools/python/xen/xend/server/SrvDaemon.py | 32 +- tools/python/xen/xend/server/SrvDeviceDir.py | 9 - tools/python/xen/xend/server/SrvDir.py | 112 +--- tools/python/xen/xend/server/SrvDomain.py | 2 +- tools/python/xen/xend/server/SrvDomainDir.py | 4 +- tools/python/xen/xend/server/SrvRoot.py | 5 +- tools/python/xen/xend/server/SrvServer.py | 27 +- tools/python/xen/xend/server/SrvUsbif.py | 2 +- tools/python/xen/xend/server/SrvXendLog.py | 4 +- tools/python/xen/xend/server/blkif.py | 14 +- tools/python/xen/xend/server/console.py | 26 +- tools/python/xen/xend/server/controller.py | 10 +- tools/python/xen/xend/server/domain.py | 58 --- tools/python/xen/xend/server/event.py | 25 +- tools/python/xen/xend/server/netif.py | 3 - 35 files changed, 1944 insertions(+), 554 deletions(-) create mode 100644 tools/python/xen/web/SrvBase.py create mode 100644 tools/python/xen/web/SrvDir.py create mode 100644 tools/python/xen/web/__init__.py create mode 100644 tools/python/xen/web/connection.py create mode 100644 tools/python/xen/web/defer.py create mode 100644 tools/python/xen/web/http.py create mode 100644 tools/python/xen/web/httpserver.py create mode 100644 tools/python/xen/web/protocol.py create mode 100644 tools/python/xen/web/reactor.py create mode 100644 tools/python/xen/web/resource.py create mode 100644 tools/python/xen/web/static.py create mode 100644 tools/python/xen/web/tcp.py create mode 100644 tools/python/xen/web/unix.py delete mode 100644 tools/python/xen/xend/server/SrvDeviceDir.py delete mode 100644 tools/python/xen/xend/server/domain.py diff --git a/.rootkeys b/.rootkeys index 06baad058b..1d950d5695 100644 --- a/.rootkeys +++ b/.rootkeys @@ -879,6 +879,19 @@ 40c9c468IienauFHQ_xJIcqnPJ8giQ tools/python/xen/util/ip.py 41dde8b0yuJX-S79w4xJKxBQ-Mhp1A tools/python/xen/util/memmap.py 4059c6a0pnxhG8hwSOivXybbGOwuXw tools/python/xen/util/tempfile.py +4267a9b16u4IEPhjRryesk6A17sobA tools/python/xen/web/SrvBase.py +4267a9b1FfCUjW7m9anLERcx9lwhJg tools/python/xen/web/SrvDir.py +4267a9b1uMXIfzB6-81ZLqMCyTgJmw tools/python/xen/web/__init__.py +4267a9b1i_zVq36tt2iQejVuR6DGFw tools/python/xen/web/connection.py +4267a9b1Z2SpO9v-zEDApywETZPDwA tools/python/xen/web/defer.py +4267a9b1KzSWZwWKYrGRc9bUhow_7Q tools/python/xen/web/http.py +4267a9b1KWNZhhmZnySe_nLASwO47g tools/python/xen/web/httpserver.py +4267a9b21miObgEJLAgtLTAKRBK8uQ tools/python/xen/web/protocol.py +4267a9b2pA22-lF37dB7XfapMNroGw tools/python/xen/web/reactor.py +4267a9b2AbH-azu7SXIUETXC39tu-A tools/python/xen/web/resource.py +4267a9b21XhDCpkVXtgea3ko8uS16g tools/python/xen/web/static.py +4267a9b2q7UA0cU5-KATCWX6O-TKsA tools/python/xen/web/tcp.py +4267a9b2XqvzKDWxfAdV22c3mO6NHA tools/python/xen/web/unix.py 40c9c468SNuObE_YWARyS0hzTPSzKg tools/python/xen/xend/Args.py 41597996WNvJA-DVCBmc0xU9w_XmoA tools/python/xen/xend/Blkctl.py 40c9c468Um_qc66OQeLEceIz1pgD5g tools/python/xen/xend/EventServer.py @@ -907,7 +920,6 @@ 40c9c468IxQabrKJSWs0aEjl-27mRQ tools/python/xen/xend/server/SrvConsole.py 40c9c4689Io5bxfbYIfRiUvsiLX0EQ tools/python/xen/xend/server/SrvConsoleDir.py 40c9c468woSmBByfeXA4o_jGf2gCgA tools/python/xen/xend/server/SrvDaemon.py -40c9c468kACsmkqjxBWKHRo071L26w tools/python/xen/xend/server/SrvDeviceDir.py 40c9c468EQZJVkCLds-OhesJVVyZbQ tools/python/xen/xend/server/SrvDir.py 40eee3a0m38EwYXfCSFIjWNwG6jx_A tools/python/xen/xend/server/SrvDmesg.py 40c9c468TyHZUq8sk0FF_vxM6Sozrg tools/python/xen/xend/server/SrvDomain.py @@ -924,7 +936,6 @@ 40c9c469N2-b3GqpLHHHPZykJPLVvA tools/python/xen/xend/server/channel.py 40c9c469hJ_IlatRne-9QEa0-wlquw tools/python/xen/xend/server/console.py 40c9c469UcNJh_NuLU0ytorM0Lk5Ow tools/python/xen/xend/server/controller.py -40d83983OXjt-y3HjSCcuoPp9rzvmw tools/python/xen/xend/server/domain.py 4266169exkN9o3hA8vxe8Er0BZv1Xw tools/python/xen/xend/server/event.py 40c9c469yrm31i60pGKslTi2Zgpotg tools/python/xen/xend/server/messages.py 40c9c46925x-Rjb0Cv2f1-l2jZrPYg tools/python/xen/xend/server/netif.py diff --git a/tools/python/setup.py b/tools/python/setup.py index 81536989ee..0a2fde5f9c 100644 --- a/tools/python/setup.py +++ b/tools/python/setup.py @@ -43,6 +43,7 @@ setup(name = 'xen', 'xen.xend.server', 'xen.sv', 'xen.xm', + 'xen.web', ], ext_package = "xen.lowlevel", ext_modules = [ xc, xu ] diff --git a/tools/python/xen/web/SrvBase.py b/tools/python/xen/web/SrvBase.py new file mode 100644 index 0000000000..6a91df05ba --- /dev/null +++ b/tools/python/xen/web/SrvBase.py @@ -0,0 +1,160 @@ +# Copyright (C) 2004 Mike Wray + +import types + +from xen.xend import sxp +from xen.xend import PrettyPrint +from xen.xend.Args import ArgError +from xen.xend.XendError import XendError +from xen.xend.XendLogging import log + +import resource +import http +import defer + +def uri_pathlist(p): + """Split a path into a list. + p path + return list of path elements + """ + l = [] + for x in p.split('/'): + if x == '': continue + l.append(x) + return l + +class SrvBase(resource.Resource): + """Base class for services. + """ + + + def use_sxp(self, req): + """Determine whether to send an SXP response to a request. + Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept. + + req request + returns 1 for SXP, 0 otherwise + """ + ok = 0 + user_agent = req.getHeader('User-Agent') + accept = req.getHeader('Accept') + if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0): + ok = 1 + return ok + + def get_op_method(self, op): + """Get the method for an operation. + For operation 'foo' looks for 'op_foo'. + + op operation name + returns method or None + """ + op_method_name = 'op_' + op + return getattr(self, op_method_name, None) + + def perform(self, req): + """General operation handler for posted operations. + For operation 'foo' looks for a method op_foo and calls + it with op_foo(op, req). Replies with code 500 if op_foo + is not found. + + The method must return a list when req.use_sxp is true + and an HTML string otherwise (or list). + Methods may also return a Deferred (for incomplete processing). + + req request + """ + op = req.args.get('op') + if op is None or len(op) != 1: + req.setResponseCode(http.NOT_ACCEPTABLE, "Invalid request") + return '' + op = op[0] + op_method = self.get_op_method(op) + if op_method is None: + req.setResponseCode(http.NOT_IMPLEMENTED, "Operation not implemented: " + op) + req.setHeader("Content-Type", "text/plain") + req.write("Operation not implemented: " + op) + return '' + else: + return self._perform(op, op_method, req) + + def _perform(self, op, op_method, req): + try: + val = op_method(op, req) + except Exception, err: + self._perform_err(err, op, req) + return '' + + if isinstance(val, defer.Deferred): + val.addCallback(self._perform_cb, op, req, dfr=1) + val.addErrback(self._perform_err, op, req, dfr=1) + return server.NOT_DONE_YET + else: + self._perform_cb(val, op, req, dfr=0) + return '' + + def _perform_cb(self, val, op, req, dfr=0): + """Callback to complete the request. + May be called from a Deferred. + + @param err: the error + @param req: request causing the error + @param dfr: deferred flag + """ + if isinstance(val, resource.ErrorPage): + req.write(val.render(req)) + elif self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(val, out=req) + else: + req.write('') + self.print_path(req) + if isinstance(val, types.ListType): + req.write('
')
+                PrettyPrint.prettyprint(val, out=req)
+                req.write('
') + else: + req.write(str(val)) + req.write('') + if dfr: + req.finish() + + def _perform_err(self, err, op, req, dfr=0): + """Error callback to complete a request. + May be called from a Deferred. + + @param err: the error + @param req: request causing the error + @param dfr: deferred flag + """ + if not (isinstance(err, ArgError) or + isinstance(err, sxp.ParseError) or + isinstance(err, XendError)): + if dfr: + return err + else: + raise + #log.exception("op=%s: %s", op, str(err)) + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(['xend.err', str(err)], out=req) + else: + req.setHeader("Content-Type", "text/plain") + req.write('Error ') + req.write(': ') + req.write(str(err)) + if dfr: + req.finish() + + + def print_path(self, req): + """Print the path with hyperlinks. + """ + pathlist = [x for x in req.prepath if x != '' ] + s = "/" + req.write('

/') + for x in pathlist: + s += x + "/" + req.write(' %s/' % (s, x)) + req.write("

") + diff --git a/tools/python/xen/web/SrvDir.py b/tools/python/xen/web/SrvDir.py new file mode 100644 index 0000000000..c8aad9e43e --- /dev/null +++ b/tools/python/xen/web/SrvDir.py @@ -0,0 +1,115 @@ +# Copyright (C) 2004 Mike Wray + +import types + +from xen.xend import sxp +from xen.xend import PrettyPrint +from xen.xend.Args import ArgError +from xen.xend.XendError import XendError +#from xen.xend.XendLogging import log + +import resource +import http + +from xen.web.SrvBase import SrvBase + +class SrvConstructor: + """Delayed constructor for sub-servers. + Does not import the sub-server class or create the object until needed. + """ + + def __init__(self, klass): + """Create a constructor. It is assumed that the class + should be imported as 'import klass from klass'. + + klass name of its class + """ + self.klass = klass + self.obj = None + + def getobj(self): + """Get the sub-server object, importing its class and instantiating it if + necessary. + """ + if not self.obj: + exec 'from xen.xend.server.%s import %s' % (self.klass, self.klass) + klassobj = eval(self.klass) + self.obj = klassobj() + return self.obj + +class SrvDir(SrvBase): + """Base class for directory servlets. + """ + isLeaf = False + + def __init__(self): + SrvBase.__init__(self) + self.table = {} + self.order = [] + + def __repr__(self): + return "" %(id(self), self.table.keys()) + + def noChild(self, msg): + return resource.ErrorPage(http.NOT_FOUND, msg=msg) + + def getChild(self, x, req): + if x == '': return self + try: + val = self.get(x) + except XendError, ex: + return self.noChild(str(ex)) + if val is None: + return self.noChild('Not found: ' + str(x)) + else: + return val + + def get(self, x): + val = self.table.get(x) + if isinstance(val, SrvConstructor): + val = val.getobj() + return val + + def add(self, x, v=None): + if v is None: + v = 'SrvDir' + if isinstance(v, types.StringType): + v = SrvConstructor(v) + self.table[x] = v + self.order.append(x) + return v + + def render_GET(self, req): + try: + if self.use_sxp(req): + req.setHeader("Content-type", sxp.mime_type) + self.ls(req, 1) + else: + req.write('') + self.print_path(req) + self.ls(req) + self.form(req) + req.write('') + return '' + except Exception, ex: + self._perform_err(ex, "GET", req) + + def ls(self, req, use_sxp=0): + url = req.prePathURL() + if not url.endswith('/'): + url += '/' + if use_sxp: + req.write('(ls ') + for k in self.order: + req.write(' ' + k) + req.write(')') + else: + req.write('
    ') + for k in self.order: + v = self.get(k) + req.write('
  • %s
  • ' + % (url, k, k)) + req.write('
') + + def form(self, req): + pass diff --git a/tools/python/xen/web/__init__.py b/tools/python/xen/web/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/tools/python/xen/web/__init__.py @@ -0,0 +1 @@ + diff --git a/tools/python/xen/web/connection.py b/tools/python/xen/web/connection.py new file mode 100644 index 0000000000..77f3b5846a --- /dev/null +++ b/tools/python/xen/web/connection.py @@ -0,0 +1,387 @@ +import sys +import threading +import select +import socket + +from errno import EAGAIN, EINTR, EWOULDBLOCK + +"""General classes to support server and client sockets, without +specifying what kind of socket they are. There are subclasses +for TCP and unix-domain sockets (see tcp.py and unix.py). +""" + +"""We make sockets non-blocking so that operations like accept() +don't block. We also select on a timeout. Otherwise we have no way +of getting the threads to shutdown. +""" +SELECT_TIMEOUT = 2.0 + +class SocketServerConnection: + """An accepted connection to a server. + """ + + def __init__(self, sock, protocol, addr, server): + self.sock = sock + self.protocol = protocol + self.addr = addr + self.server = server + self.buffer_n = 1024 + self.thread = None + self.connected = True + protocol.setTransport(self) + protocol.connectionMade(addr) + + def run(self): + self.thread = threading.Thread(target=self.main) + #self.thread.setDaemon(True) + self.thread.start() + + def main(self): + while True: + if not self.thread: break + if self.select(): break + if not self.thread: break + data = self.read() + if data is None: continue + if data is True: break + if self.dataReceived(data): break + + def select(self): + try: + select.select([self.sock], [], [], SELECT_TIMEOUT) + return False + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return False + else: + self.loseConnection(ex) + return True + + def read(self): + try: + data = self.sock.recv(self.buffer_n) + if data == '': + self.loseConnection() + return True + return data + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return None + else: + self.loseConnection(ex) + return True + + def dataReceived(self, data): + if not self.protocol: + return True + try: + self.protocol.dataReceived(data) + except SystemExit: + raise + except Exception, ex: + self.disconnect(ex) + return True + return False + + def write(self, data): + self.sock.send(data) + + def loseConnection(self, reason=None): + self.thread = None + self.closeSocket(reason) + self.closeProtocol(reason) + + def closeSocket(self, reason): + try: + self.sock.close() + except SystemExit: + raise + except: + pass + + def closeProtocol(self, reason): + try: + if self.connected: + self.connected = False + if self.protocol: + self.protocol.connectionLost(reason) + except SystemExit: + raise + except: + pass + + def getHost(self): + return self.sock.getsockname() + + def getPeer(self): + return self.addr + +class SocketListener: + """A server socket, running listen in a thread. + Accepts connections and runs a thread for each one. + """ + + def __init__(self, factory, backlog=None): + if backlog is None: + backlog = 5 + self.factory = factory + self.sock = None + self.backlog = backlog + self.thread = None + + def createSocket(self): + raise NotImplementedError() + + def acceptConnection(self, sock, protocol, addr): + return SocketServerConnection(sock, protocol, addr, self) + + def startListening(self): + if self.sock or self.thread: + raise IOError("already listening") + self.sock = self.createSocket() + self.sock.setblocking(0) + self.sock.listen(self.backlog) + self.run() + + def stopListening(self, reason=None): + self.loseConnection(reason) + + def run(self): + self.factory.doStart() + self.thread = threading.Thread(target=self.main) + #self.thread.setDaemon(True) + self.thread.start() + + def main(self): + while True: + if not self.thread: break + if self.select(): break + if self.accept(): break + + def select(self): + try: + select.select([self.sock], [], [], SELECT_TIMEOUT) + return False + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return False + else: + self.loseConnection(ex) + return True + + def accept(self): + try: + (sock, addr) = self.sock.accept() + sock.setblocking(0) + return self.accepted(sock, addr) + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return False + else: + self.loseConnection(ex) + return True + + def accepted(self, sock, addr): + protocol = self.factory.buildProtocol(addr) + if protocol is None: + self.loseConnection() + return True + connection = self.acceptConnection(sock, protocol, addr) + connection.run() + return False + + def loseConnection(self, reason=None): + self.thread = None + self.closeSocket(reason) + self.closeFactory(reason) + + def closeSocket(self, reason): + try: + self.sock.close() + except SystemExit: + raise + except Exception, ex: + pass + + def closeFactory(self, reason): + try: + self.factory.doStop() + except SystemExit: + raise + except: + pass + +class SocketClientConnection: + """A connection to a server from a client. + + Call connectionMade() on the protocol in a thread when connected. + It is completely up to the protocol what to do. + """ + + def __init__(self, connector): + self.addr = None + self.connector = connector + self.buffer_n = 1024 + self.connected = False + + def createSocket (self): + raise NotImplementedError() + + def write(self, data): + if self.sock: + return self.sock.send(data) + else: + return 0 + + def connect(self, timeout): + #todo: run a timer to cancel on timeout? + try: + sock = self.createSocket() + sock.connect(self.addr) + self.sock = sock + self.connected = True + self.protocol = self.connector.buildProtocol(self.addr) + self.protocol.setTransport(self) + except SystemExit: + raise + except Exception, ex: + self.connector.connectionFailed(ex) + return False + + self.thread = threading.Thread(target=self.main) + #self.thread.setDaemon(True) + self.thread.start() + return True + + def main(self): + try: + # Call the protocol in a thread. + # Up to it what to do. + self.protocol.connectionMade(self.addr) + except SystemExit: + raise + except Exception, ex: + self.disconnect(ex) + + def mainLoop(self): + # Something a protocol could call. + while True: + if not self.thread: break + if self.select(): break + if not self.thread: break + data = self.read() + if data is None: continue + if data is True: break + if self.dataReceived(data): break + + def select(self): + try: + select.select([self.sock], [], [], SELECT_TIMEOUT) + return False + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return False + else: + self.disconnect(ex) + return True + + def read(self): + try: + data = self.sock.recv(self.buffer_n) + return data + except socket.error, ex: + if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): + return None + else: + self.disconnect(ex) + return True + + def dataReceived(self, data): + if not self.protocol: + return True + try: + self.protocol.dataReceived(data) + except SystemExit: + raise + except Exception, ex: + self.disconnect(ex) + return True + return False + + def disconnect(self, reason=None): + self.thread = None + self.closeSocket(reason) + self.closeProtocol(reason) + self.closeConnector(reason) + + def closeSocket(self, reason): + try: + if self.sock: + self.sock.close() + except SystemExit: + raise + except: + pass + + def closeProtocol(self, reason): + try: + if self.connected: + self.connected = False + if self.protocol: + self.protocol.connectionLost(reason) + except SystemExit: + raise + except: + pass + self.protocol = None + + def closeConnector(self, reason): + try: + self.connector.connectionLost(reason) + except SystemExit: + raise + except: + pass + +class SocketConnector: + """A client socket. Connects to a server and runs the client protocol + in a thread. + """ + + def __init__(self, factory): + self.factoryStarted = False + self.factory = factory + self.state = "disconnected" + self.transport = None + + def getDestination(self): + raise NotImplementedError() + + def connectTransport(self): + raise NotImplementedError() + + def connect(self): + if self.state != "disconnected": + raise socket.error(EINVAL, "cannot connect in state " + self.state) + self.state = "connecting" + if not self.factoryStarted: + self.factoryStarted = True + self.factory.doStart() + self.factory.startedConnecting() + self.connectTransport() + + def stopConnecting(self): + if self.state != "connecting": + return + self.state = "disconnected" + self.transport.disconnect() + + def buildProtocol(self, addr): + return self.factory.buildProtocol(addr) + + def connectionLost(self, reason=None): + self.factory.doStop() + + def connectionFailed(self, reason=None): + self.factory.doStop() + diff --git a/tools/python/xen/web/defer.py b/tools/python/xen/web/defer.py new file mode 100644 index 0000000000..c5f1071b72 --- /dev/null +++ b/tools/python/xen/web/defer.py @@ -0,0 +1,3 @@ + +class Deferred: + pass diff --git a/tools/python/xen/web/http.py b/tools/python/xen/web/http.py new file mode 100644 index 0000000000..0fa35c3b68 --- /dev/null +++ b/tools/python/xen/web/http.py @@ -0,0 +1,516 @@ +#============================================================================ +# This library is free software; you can redistribute it and/or +# modify it under the terms of version 2.1 of the GNU Lesser General Public +# License as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +#============================================================================ +# Parts of this library are derived from Twisted: +# Copyright (C) 2001 Matthew W. Lefkowitz +# +# Copyright (C) 2005 Mike Wray +#============================================================================ + +from mimetools import Message +from cStringIO import StringIO +import math +import time +import cgi + +CONTINUE = 100 +SWITCHING_PROTOCOLS = 101 + +OK = 200 +CREATED = 201 +ACCEPTED = 202 +NON_AUTHORITATIVE_INFORMATION = 203 +NO_CONTENT = 204 +RESET_CONTENT = 205 +PARTIAL_CONTENT = 206 +MULTI_STATUS = 207 + +MULTIPLE_CHOICE = 300 +MOVED_PERMANENTLY = 301 +FOUND = 302 +SEE_OTHER = 303 +NOT_MODIFIED = 304 +USE_PROXY = 305 +TEMPORARY_REDIRECT = 307 + +BAD_REQUEST = 400 +UNAUTHORIZED = 401 +PAYMENT_REQUIRED = 402 +FORBIDDEN = 403 +NOT_FOUND = 404 +NOT_ALLOWED = 405 +NOT_ACCEPTABLE = 406 +PROXY_AUTH_REQUIRED = 407 +REQUEST_TIMEOUT = 408 +CONFLICT = 409 +GONE = 410 +LENGTH_REQUIRED = 411 +PRECONDITION_FAILED = 412 +REQUEST_ENTITY_TOO_LARGE = 413 +REQUEST_URI_TOO_LONG = 414 +UNSUPPORTED_MEDIA_TYPE = 415 +REQUESTED_RANGE_NOT_SATISFIABLE = 416 +EXPECTATION_FAILED = 417 + +INTERNAL_SERVER_ERROR = 500 +NOT_IMPLEMENTED = 501 +BAD_GATEWAY = 502 +SERVICE_UNAVAILABLE = 503 +GATEWAY_TIMEOUT = 504 +VERSION_NOT_SUPPORTED = 505 +INSUFFICIENT_STORAGE_SPACE = 507 +NOT_EXTENDED = 510 + +NO_BODY_CODES = [ NO_CONTENT, NOT_MODIFIED ] + + +STATUS = { + CONTINUE : "Continue", + SWITCHING_PROTOCOLS : "Switching protocols", + + OK : "OK", + CREATED : "Created", + ACCEPTED : "Accepted", + NON_AUTHORITATIVE_INFORMATION : "Non-authoritative information", + NO_CONTENT : "No content", + RESET_CONTENT : "Reset content", + PARTIAL_CONTENT : "Partial content", + MULTI_STATUS : "Multi-status", + + MULTIPLE_CHOICE : "Multiple choice", + MOVED_PERMANENTLY : "Moved permanently", + FOUND : "Found", + SEE_OTHER : "See other", + NOT_MODIFIED : "Not modified", + USE_PROXY : "Use proxy", + TEMPORARY_REDIRECT : "Temporary redirect", + + BAD_REQUEST : "Bad request", + UNAUTHORIZED : "Unauthorized", + PAYMENT_REQUIRED : "Payment required", + FORBIDDEN : "Forbidden", + NOT_FOUND : "Not found", + NOT_ALLOWED : "Not allowed", + NOT_ACCEPTABLE : "Not acceptable", + PROXY_AUTH_REQUIRED : "Proxy authentication required", + REQUEST_TIMEOUT : "Request timeout", + CONFLICT : "Conflict", + GONE : "Gone", + LENGTH_REQUIRED : "Length required", + PRECONDITION_FAILED : "Precondition failed", + REQUEST_ENTITY_TOO_LARGE : "Request entity too large", + REQUEST_URI_TOO_LONG : "Request URI too long", + UNSUPPORTED_MEDIA_TYPE : "Unsupported media type", + REQUESTED_RANGE_NOT_SATISFIABLE : "Requested range not satisfiable", + EXPECTATION_FAILED : "Expectation failed", + + INTERNAL_SERVER_ERROR : "Internal server error", + NOT_IMPLEMENTED : "Not implemented", + BAD_GATEWAY : "Bad gateway", + SERVICE_UNAVAILABLE : "Service unavailable", + GATEWAY_TIMEOUT : "Gateway timeout", + VERSION_NOT_SUPPORTED : "HTTP version not supported", + INSUFFICIENT_STORAGE_SPACE : "Insufficient storage space", + NOT_EXTENDED : "Not extended", + } + +def getStatus(code): + return STATUS.get(code, "unknown") + +MULTIPART_FORM_DATA = 'multipart/form-data' +URLENCODED = 'application/x-www-form-urlencoded' + +parseQueryArgs = cgi.parse_qs + +def timegm(year, month, day, hour, minute, second): + """Convert time tuple in GMT to seconds since epoch, GMT""" + EPOCH = 1970 + assert year >= EPOCH + assert 1 <= month <= 12 + days = 365*(year-EPOCH) + calendar.leapdays(EPOCH, year) + for i in range(1, month): + days = days + calendar.mdays[i] + if month > 2 and calendar.isleap(year): + days = days + 1 + days = days + day - 1 + hours = days*24 + hour + minutes = hours*60 + minute + seconds = minutes*60 + second + return seconds + +def stringToDatetime(dateString): + """Convert an HTTP date string to seconds since epoch.""" + parts = dateString.split(' ') + day = int(parts[1]) + month = int(monthname.index(parts[2])) + year = int(parts[3]) + hour, min, sec = map(int, parts[4].split(':')) + return int(timegm(year, month, day, hour, min, sec)) + +class HttpRequest: + + http_version = (1, 1) + + http_version_string = ("HTTP/%d.%d" % http_version) + + max_content_length = 10000 + max_headers = 500 + + request_line = None + request_method = None + request_uri = None + request_path = None + request_query = None + request_version = None + content_length = 0 + content = None + etag = None + close_connection = True + response_code = 200 + response_status = "OK" + response_sent = False + cached = False + last_modified = None + + forceSSL = False + + def __init__(self, host, rin, out): + self.host = host + self.rin = rin + self.out = out + self.request_args = {} + self.args = self.request_args + self.request_headers = {} + self.request_cookies = {} + self.response_headers = {} + self.response_cookies = {} + self.output = StringIO() + self.parseRequest() + + def isSecure(self): + return self.forceSSL + + def getRequestMethod(self): + return self.request_method + + def trim(self, str, ends): + for end in ends: + if str.endswith(end): + str = str[ : -len(end) ] + break + return str + + def requestError(self, code, msg=None): + self.sendError(code, msg) + raise ValueError(self.response_status) + + def sendError(self, code, msg=None): + self.setResponseCode(code, msg=msg) + self.sendResponse() + + def parseRequestVersion(self, version): + try: + if not version.startswith('HTTP/'): + raise ValueError + version_string = version.split('/', 1)[1] + version_codes = version_string.split('.') + if len(version_codes) != 2: + raise ValueError + request_version = (int(version_codes[0]), int(version_codes[1])) + except (ValueError, IndexError): + self.requestError(400, "Bad request version (%s)" % `version`) + + def parseRequestLine(self): + line = self.trim(self.request_line, ['\r\n', '\n']) + line_fields = line.split() + n = len(line_fields) + if n == 3: + [method, uri, version] = line_fields + elif n == 2: + [method, uri] = line_fields + version = 'HTTP/0.9' + else: + self.requestError(BAD_REQUEST, + "Bad request (%s)" % `line`) + + request_version = self.parseRequestVersion(version) + + if request_version > (2, 0): + self.requestError(VERSION_NOT_SUPPORTED, + "HTTP version not supported (%s)" % `version`) + #if request_version >= (1, 1) and self.http_version >= (1, 1): + # self.close_connection = False + #else: + # self.close_connection = True + + self.request_method = method + self.method = method + self.request_uri = uri + self.request_version = version + + uri_query = uri.split('?') + if len(uri_query) == 1: + self.request_path = uri + else: + self.request_path = uri_query[0] + self.request_query = uri_query[1] + self.request_args = parseQueryArgs(self.request_query) + self.args = self.request_args + + + def parseRequestHeaders(self): + header_bytes = "" + header_count = 0 + while True: + if header_count >= self.max_headers: + self.requestError(BAD_REQUEST, + "Bad request (too many headers)") + line = self.rin.readline() + header_bytes += line + header_count += 1 + if line == '\r\n' or line == '\n' or line == '': + break + #print 'parseRequestHeaders>', header_bytes + header_input = StringIO(header_bytes) + self.request_headers = Message(header_input) + + def parseRequestCookies(self): + cookie_hdr = self.getHeader("cookie") + if not cookie_hdr: return + for cookie in cookie_hdr.split(';'): + try: + cookie = cookie.lstrip() + (k, v) = cookie.split('=', 1) + self.request_cookies[k] = v + except ValueError: + pass + + def parseRequestArgs(self): + if ((self.content is None) or + (self.request_method != "POST")): + return + content_type = self.getHeader('content-type') + if not content_type: + return + (encoding, params) = cgi.parse_header(content_type) + if encoding == URLENCODED: + xargs = cgi.parse_qs(self.content.getvalue(), + keep_blank_values=True) + elif encoding == MULTIPART_FORM_DATA: + xargs = cgi.parse_multipart(self.content, params) + else: + xargs = {} + self.request_args.update(xargs) + + def getCookie(self, k): + return self.request_cookies[k] + + def readContent(self): + try: + self.content_length = int(self.getHeader("Content-Length")) + except: + return + if self.content_length > self.max_content_length: + self.requestError(REQUEST_ENTITY_TOO_LARGE) + self.content = self.rin.read(self.content_length) + self.content = StringIO(self.content) + self.content.seek(0,0) + + def parseRequest(self): + #print 'parseRequest>' + self.request_line = self.rin.readline() + self.parseRequestLine() + self.parseRequestHeaders() + self.parseRequestCookies() + connection_mode = self.getHeader('Connection') + self.setCloseConnection(connection_mode) + self.readContent() + self.parseRequestArgs() + #print 'parseRequest<' + + def setCloseConnection(self, mode): + if not mode: return + mode = mode.lower() + if mode == 'close': + self.close_connection = True + elif (mode == 'keep-alive') and (self.http_version >= (1, 1)): + self.close_connection = False + #print 'setCloseConnection>', mode, self.close_connection + + def getHeader(self, k, v=None): + return self.request_headers.get(k, v) + + def getRequestMethod(self): + return self.request_method + + def getRequestPath(self): + return self.request_path + + def setResponseCode(self, code, status=None, msg=None): + self.response_code = code + if not status: + status = getStatus(code) + self.response_status = status + + def setResponseHeader(self, k, v): + #print 'setResponseHeader>', k, v + k = k.lower() + self.response_headers[k] = v + if k == 'connection': + self.setCloseConnection(v) + + setHeader = setResponseHeader + + def setLastModified(self, when): + # time.time() may be a float, but the HTTP-date strings are + # only good for whole seconds. + when = long(math.ceil(when)) + if (not self.last_modified) or (self.last_modified < when): + self.lastModified = when + + modified_since = self.getHeader('if-modified-since') + if modified_since: + modified_since = stringToDatetime(modified_since) + if modified_since >= when: + self.setResponseCode(NOT_MODIFIED) + self.cached = True + + def setContentType(self, ty): + self.setResponseHeader("Content-Type", ty) + + def setEtag(self, etag): + if etag: + self.etag = etag + + tags = self.getHeader("if-none-match") + if tags: + tags = tags.split() + if (etag in tags) or ('*' in tags): + if self.request_method in ("HEAD", "GET"): + code = NOT_MODIFIED + else: + code = PRECONDITION_FAILED + self.setResponseCode(code) + self.cached = True + + def addCookie(self, k, v, expires=None, domain=None, path=None, + max_age=None, comment=None, secure=None): + cookie = v + if expires != None: + cookie += "; Expires=%s" % expires + if domain != None: + cookie += "; Domain=%s" % domain + if path != None: + cookie += "; Path=%s" % path + if max_age != None: + cookie += "; Max-Age=%s" % max_age + if comment != None: + cookie += "; Comment=%s" % comment + if secure: + cookie += "; Secure" + self.response_cookies[k] = cookie + + def sendResponseHeaders(self): + if self.etag: + self.setResponseHeader("ETag", self.etag) + for (k, v) in self.response_headers.items(): + self.send("%s: %s\r\n" % (k.capitalize(), v)) + for (k, v) in self.response_cookies.items(): + self.send("Set-Cookie: %s=%s\r\n" % (k, v)) + self.send("\r\n") + + def sendResponse(self): + #print 'sendResponse>' + if self.response_sent: + return + self.response_sent = True + send_body = self.hasBody() + if not self.close_connection: + self.setResponseHeader("Connection", "keep-alive") + if send_body: + self.output.seek(0, 0) + body = self.output.getvalue() + body_length = len(body) + #print 'sendResponse> body=', body_length, body + self.setResponseHeader("Content-Length", body_length) + if self.http_version > (0, 9): + self.send("%s %d %s\r\n" % (self.http_version_string, + self.response_code, + self.response_status)) + self.sendResponseHeaders() + if send_body: + #print 'sendResponse> writing body' + self.send(body) + + def write(self, data): + #print 'write>', data + self.output.write(data) + + def send(self, data): + #print 'send>', len(data), '|%s|' % data + self.out.write(data) + + def hasNoBody(self): + return ((self.request_method == "HEAD") or + (self.response_code in NO_BODY_CODES) or + self.cached) + + def hasBody(self): + return not self.hasNoBody() + + def process(self): + pass + return self.close_connection + + def getRequestHostname(self): + """Get the hostname that the user passed in to the request. + + Uses the 'Host:' header if it is available, and the + host we are listening on otherwise. + """ + return (self.getHeader('host') or + socket.gethostbyaddr(self.getHostAddr())[0] + ).split(':')[0] + + def getHost(self): + return self.host + + def getHostAddr(self): + return self.host[0] + + def getPort(self): + return self.host[1] + + def setHost(self, host, port, ssl=0): + """Change the host and port the request thinks it's using. + + This method is useful for working with reverse HTTP proxies (e.g. + both Squid and Apache's mod_proxy can do this), when the address + the HTTP client is using is different than the one we're listening on. + + For example, Apache may be listening on https://www.example.com, and then + forwarding requests to http://localhost:8080, but we don't want HTML produced + to say 'http://localhost:8080', they should say 'https://www.example.com', + so we do:: + + request.setHost('www.example.com', 443, ssl=1) + + """ + self.forceSSL = ssl + self.received_headers["host"] = host + self.host = (host, port) + + + diff --git a/tools/python/xen/web/httpserver.py b/tools/python/xen/web/httpserver.py new file mode 100644 index 0000000000..4d446bedab --- /dev/null +++ b/tools/python/xen/web/httpserver.py @@ -0,0 +1,144 @@ +import string +import socket +from urllib import quote, unquote + +import http +from SrvDir import SrvDir + +class HttpServerRequest(http.HttpRequest): + + def __init__(self, server, addr, srd, srw): + #print 'HttpServerRequest>', addr + self.server = server + self.prepath = '' + http.HttpRequest.__init__(self, addr, srd, srw) + + def process(self): + #print 'HttpServerRequest>process', 'path=', self.request_path + self.prepath = [] + self.postpath = map(unquote, string.split(self.request_path[1:], '/')) + res = self.getResource() + self.render(res) + self.sendResponse() + return self.close_connection + + def prePathURL(self): + url_host = self.getRequestHostname() + port = self.getPort() + if self.isSecure(): + url_proto = "https" + default_port = 443 + else: + url_proto = "http" + default_port = 80 + if port != default_port: + url_host += (':%d' % port) + url_path = quote(string.join(self.prepath, '/')) + return ('%s://%s/%s' % (url_proto, url_host, url_path)) + + def getResource(self): + return self.server.getResource(self) + + def render(self, res): + #print 'HttpServerRequest>render', res + if res is None: + self.sendError(http.NOT_FOUND) + else: + res.render(self) + +class HttpServer: + + request_queue_size = 5 + + def __init__(self, interface='', port=8080, root=None): + if root is None: + root = SrvDir() + self.interface = interface + self.port = port + self.closed = False + self.root = root + + def getRoot(self): + return self.root + + def getPort(self): + return self.port + + def run(self): + self.bind() + self.listen() + self.requestLoop() + + def stop(self): + self.close() + + def bind(self): + #print 'bind>', self.interface, self.port + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((self.interface, self.port)) + + def listen(self): + self.socket.listen(self.request_queue_size) + + def accept(self): + return self.socket.accept() + + def requestLoop(self): + while not self.closed: + self.acceptRequest() + + def close(self): + self.closed = True + try: + self.socket.close() + except: + pass + + def acceptRequest(self): + #print 'acceptRequest>' + try: + (sock, addr) = self.accept() + #print 'acceptRequest>', sock, addr + self.processRequest(sock, addr) + except socket.error: + return + + def processRequest(self, sock, addr): + #print 'processRequest>', sock, addr + srd = sock.makefile('rb') + srw = sock.makefile('wb') + srvaddr = (socket.gethostname(), self.port) + while True: + #print 'HttpServerRequest...' + req = HttpServerRequest(self, srvaddr, srd, srw) + close = req.process() + srw.flush() + #print 'HttpServerRequest close=', close + if close: + break + try: + #print 'close...' + sock.close() + except: + pass + #print 'processRequest<', sock, addr + + def getResource(self, req): + return self.root.getRequestResource(req) + + +def main(): + root = SrvDir() + a = root.add("a", SrvDir()) + b = root.add("b", SrvDir()) + server = HttpServer(root=root) + server.run() + +if __name__ == "__main__": + main() + + + + + diff --git a/tools/python/xen/web/protocol.py b/tools/python/xen/web/protocol.py new file mode 100644 index 0000000000..84b493cd47 --- /dev/null +++ b/tools/python/xen/web/protocol.py @@ -0,0 +1,94 @@ +class Factory: + + def __init__(self): + pass + + def startedConnecting(self): + print 'ServerProtocolFactory>startedConnecting>' + pass + + def doStart(self): + print 'ServerProtocolFactory>doStart>' + pass + + def doStop(self): + print 'ServerProtocolFactory>doStop>' + pass + + def buildProtocol(self, addr): + print 'ServerProtocolFactory>buildProtocol>', addr + return Protocol(self) + +class ServerFactory(Factory): + pass + +class ClientFactory(Factory): + pass + +class Protocol: + + factory = None + transport = None + connected = False + + def __init__(self, factory): + self.factory = factory + + def setTransport(self, transport): + self.transport = transport + self.connected = bool(transport) + + def getTransport(self): + return self.transport + + def connectionMade(self, addr): + print 'Protocol>connectionMade>', addr + pass + + def connectionLost(self, reason=None): + print 'Protocol>connectionLost>', reason + pass + + def dataReceived(self, data): + print 'Protocol>dataReceived>' + pass + + def write(self, data): + if self.transport: + return self.transport.write(data) + else: + return 0 + + def read(self): + if self.transport: + return self.transport.read() + else: + return None + +class TestClientFactory(Factory): + + def buildProtocol(self, addr): + print 'TestClientProtocolFactory>buildProtocol>', addr + return TestClientProtocol(self) + +class TestClientProtocol(Protocol): + + def connectionMade(self, addr): + print 'TestProtocol>connectionMade>', addr + self.write("hello") + self.write("there") + +class TestServerFactory(Factory): + + def buildProtocol(self, addr): + print 'TestServerProtocolFactory>buildProtocol>', addr + return TestServerProtocol(self) + +class TestServerProtocol(Protocol): + + def dataReceived(self, data): + print 'TestServerProtocol>dataReceived>', len(data), data + #sys.exit(0) + import os + os._exit(0) + diff --git a/tools/python/xen/web/reactor.py b/tools/python/xen/web/reactor.py new file mode 100644 index 0000000000..aa8e0c0646 --- /dev/null +++ b/tools/python/xen/web/reactor.py @@ -0,0 +1,9 @@ +from threading import Timer + +from unix import listenUNIX, connectUNIX +from tcp import listenTCP, connectTCP + +def callLater(_delay, _fn, *args, **kwds): + timer = Timer(_delay, _fn, args=args, kwargs=kwds) + timer.start() + return timer diff --git a/tools/python/xen/web/resource.py b/tools/python/xen/web/resource.py new file mode 100644 index 0000000000..3b5e745671 --- /dev/null +++ b/tools/python/xen/web/resource.py @@ -0,0 +1,91 @@ +import http + +def findResource(resource, request): + """Traverse resource tree to find who will handle the request.""" + while request.postpath and not resource.isLeaf: + #print 'findResource:', resource, request.postpath + pathElement = request.postpath.pop(0) + request.prepath.append(pathElement) + next = resource.getPathResource(pathElement, request) + if not next: break + resource = next + return resource + +class Resource: + + isLeaf = False + + def __init__(self): + self.children = {} + + def getRequestResource(self, req): + return findResource(self, req) + + def getChild(self, path, request): + return None + + def getPathResource(self, path, request): + #print 'getPathResource>', self, path + if self.children.has_key(path): + val = self.children[path] + else: + val = self.getChild(path, request) + #print 'getPathResource<', val + return val + + def putChild(self, path, child): + self.children[path] = child + #child.server = self.server + + def render(self, req): + meth = getattr(self, 'render_' + req.getRequestMethod(), self.unsupported) + return meth(req) + + def supportedMethods(self): + l = [] + s = 'render_' + for x in dir(self): + if x.startswith(s): + l.append(x[len(s):]) + return l + + def render_HEAD(self, req): + return self.render_GET(req) + + def render_GET(self, req): + req.setContentType("text/plain") + req.write("GET") + + def render_POST(self, req): + req.setContentType("text/plain") + req.write("POST") + + def unsupported(self, req): + req.setHeader("Accept", ",".join(self.supportedMethods())) + req.setResponseCode(http.NOT_IMPLEMENTED) + req.setContentType("text/plain") + req.write("Request method not supported (%s)" % req.getRequestMethod()) + +class ErrorPage(Resource): + + isLeaf = True + + def __init__(self, code, status=None, msg=None): + Resource.__init__(self) + if status is None: + status = http.getStatus(code) + if msg is None: + msg = status + self.code = code + self.status = status + self.msg = msg + + def render(self, req): + req.setResponseCode(self.code, self.status) + req.setContentType("text/plain") + req.write(self.msg) + + + + + diff --git a/tools/python/xen/web/static.py b/tools/python/xen/web/static.py new file mode 100644 index 0000000000..3bed394e77 --- /dev/null +++ b/tools/python/xen/web/static.py @@ -0,0 +1,46 @@ +import os + +from resource import Resource + +class File(Resource): + + isLeaf = True + + def __init__(self, filename, defaultType=None): + if defaultType is None: + defaultType = "text/plain" + self.filename = filename + self.type = defaultType + self.encoding = None + + def getFileSize(self): + try: + info = os.stat(self.filename) + return info.st_size + except: + return 0 + + def render(self, req): + if self.type: + req.setHeader('Content-Type', self.type) + if self.encoding: + rew.setHeader('Content-Encoding', self.encoding) + req.setHeader('Content-Length', self.getFileSize()) + try: + io = file(self.filename, "r") + while True: + buf = io.read(1024) + if not buf: + break + req.write(buf) + except IOError: + pass + try: + if io: + io.close() + except: + pass + return '' + + + diff --git a/tools/python/xen/web/tcp.py b/tools/python/xen/web/tcp.py new file mode 100644 index 0000000000..01a8e73865 --- /dev/null +++ b/tools/python/xen/web/tcp.py @@ -0,0 +1,90 @@ +import sys +import socket +import types + +from connection import * +from protocol import * + +class TCPServerConnection(SocketServerConnection): + pass + +class TCPListener(SocketListener): + + def __init__(self, port, factory, backlog=None, interface=''): + SocketListener.__init__(self, factory, backlog=backlog) + self.port = port + self.interface = interface + + def createSocket(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + addr = (self.interface, self.port) + sock.bind(addr) + return sock + + def acceptConnection(self, sock, protocol, addr): + return TCPServerConnection(sock, protocol, addr, self) + +class TCPClientConnection(SocketClientConnection): + + def __init__(self, host, port, bindAddress, connector): + SocketClientConnection.__init__(self, connector) + self.addr = (host, port) + self.bindAddress = bindAddress + + def createSocket(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if self.bindAddress is not None: + sock.bind(self.bindAddress) + return sock + +class TCPConnector(SocketConnector): + + def __init__(self, host, port, factory, timeout=None, bindAddress=None): + SocketConnector.__init__(self, factory) + self.host = host + self.port = self.servicePort(port) + self.bindAddress = bindAddress + self.timeout = timeout + + def servicePort(self, port): + if isinstance(port, types.StringTypes): + try: + port = socket.getservbyname(port, 'tcp') + except socket.error, ex: + raise IOError("unknown service: " + ex) + return port + + def getDestination(self): + return (self.host, self.port) + + def connectTransport(self): + self.transport = TCPClientConnection( + self.host, self.port, self.bindAddress, self) + self.transport.connect(self.timeout) + +def listenTCP(port, factory, interface='', backlog=None): + l = TCPListener(port, factory, interface=interface, backlog=backlog) + l.startListening() + return l + +def connectTCP(host, port, factory, timeout=None, bindAddress=None): + c = TCPConnector(host, port, factory, timeout=timeout, bindAddress=bindAddress) + c.connect() + return c + +def main(argv): + host = 'localhost' + port = 8005 + if argv[1] == "client": + c = connectTCP(host, port, TestClientFactory()) + print 'client:', c + else: + s = listenTCP(port, TestServerFactory()) + print 'server:', s + +if __name__ == "__main__": + main(sys.argv) + + + diff --git a/tools/python/xen/web/unix.py b/tools/python/xen/web/unix.py new file mode 100644 index 0000000000..d82ed0a6c3 --- /dev/null +++ b/tools/python/xen/web/unix.py @@ -0,0 +1,76 @@ +import sys +import socket +import os + +from connection import * +from protocol import * + +class UnixServerConnection(SocketServerConnection): + pass + +class UnixListener(SocketListener): + + def __init__(self, path, factory, backlog=None): + SocketListener.__init__(self, factory, backlog=backlog) + self.path = path + + def createSocket(self): + try: + os.unlink(self.path) + except SystemExit: + raise + except Exception, ex: + pass + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(self.path) + return sock + + def acceptConnection(self, sock, protocol, addr): + return UnixServerConnection(sock, protocol, addr, self) + +class UnixClientConnection(SocketClientConnection): + + def __init__(self, addr, connector): + SocketClientConnection.__init__(self, connector) + self.addr = addr + + def createSocket(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + return sock + +class UnixConnector(SocketConnector): + + def __init__(self, path, factory, timeout=None): + SocketConnector.__init__(self, factory) + self.addr = path + self.timeout = timeout + + def getDestination(self): + return self.addr + + def connectTransport(self): + self.transport = UnixClientConnection(self.addr, self) + self.transport.connect(self.timeout) + +def listenUNIX(path, factory, backlog=None): + l = UnixListener(path, factory, backlog=backlog) + l.startListening() + return l + +def connectUNIX(path, factory, timeout=None): + c = UnixConnector(path, factory, timeout=timeout) + c.connect() + return c + +def main(argv): + path = "/tmp/test-foo" + if argv[1] == "client": + c = connectUNIX(path, TestClientFactory()) + print "client:", c + else: + s = listenUNIX(path, TestServeractory()) + print "server:", s + +if __name__ == "__main__": + main(sys.argv) + diff --git a/tools/python/xen/xend/EventServer.py b/tools/python/xen/xend/EventServer.py index 20c567ada7..13bb7a3daa 100644 --- a/tools/python/xen/xend/EventServer.py +++ b/tools/python/xen/xend/EventServer.py @@ -3,8 +3,10 @@ """ import string +from threading import Lock -from twisted.internet import reactor +#from twisted.internet import reactor +from xen.web import reactor # subscribe a.b.c h: map a.b.c -> h # subscribe a.b.* h: map a.b.* -> h @@ -38,20 +40,30 @@ class EventServer: self.handlers = {} self.run = run self.queue = [] + self.lock = Lock() def start(self): """Enable event handling. Sends any queued events. """ - self.run = 1 - for (e,v) in self.queue: + try: + self.lock.acquire() + self.run = 1 + queue = self.queue + self.queue = [] + finally: + self.lock.release() + for (e,v) in queue: self.inject(e, v) - self.queue = [] def stop(self): """Suspend event handling. Events injected while suspended are queued until we are started again. """ - self.run = 0 + try: + self.lock.acquire() + self.run = 0 + finally: + self.lock.release() def subscribe(self, event, handler): """Subscribe to an event. For example 'a.b.c.d'. @@ -62,21 +74,29 @@ class EventServer: event event name handler event handler fn(event, val) """ - hl = self.handlers.get(event) - if hl is None: - self.handlers[event] = [handler] - else: - hl.append(handler) + try: + self.lock.acquire() + hl = self.handlers.get(event) + if hl is None: + self.handlers[event] = [handler] + else: + hl.append(handler) + finally: + self.lock.release() def unsubscribe_all(self, event=None): """Unsubscribe all handlers for a given event, or all handlers. event event (optional) """ - if event == None: - self.handlers.clear() - elif event in self.handlers: - del self.handlers[event] + try: + self.lock.acquire() + if event == None: + self.handlers.clear() + elif event in self.handlers: + del self.handlers[event] + finally: + self.lock.release() def unsubscribe(self, event, handler): """Unsubscribe a given event and handler. @@ -84,11 +104,15 @@ class EventServer: event event handler handler """ - hl = self.handlers.get(event) - if hl is None: - return - if handler in hl: - hl.remove(handler) + try: + self.lock.acquire() + hl = self.handlers.get(event) + if hl is None: + return + if handler in hl: + hl.remove(handler) + finally: + self.lock.release() def inject(self, event, val, async=1): """Inject an event. Handlers for it are called if running, otherwise @@ -97,13 +121,18 @@ class EventServer: event event type val event value """ - if self.run: - if async: - reactor.callLater(0, self.call_handlers, event, val) - else: - self.notify_handlers(event, val) + try: + self.lock.acquire() + if not self.run: + self.queue.append( (event, val) ) + return + finally: + self.lock.release() + + if async: + reactor.callLater(0, self.call_handlers, event, val) else: - self.queue.append( (event, val) ) + self.notify_handlers(event, val) def call_handlers(self, event, val): """Internal method to call event handlers. @@ -121,13 +150,19 @@ class EventServer: event event type val event value """ - hl = self.handlers.get(key) - if hl is None: - return - # Copy the handler list so that handlers can call - # subscribe/unsubscribe safely - python list iteration - # is not safe against list modification. - for h in hl[:]: + try: + self.lock.acquire() + hl = self.handlers.get(key) + if hl is None: + return + # Copy the handler list so that handlers can call + # subscribe/unsubscribe safely - python list iteration + # is not safe against list modification. + hl = hl[:] + finally: + self.lock.release() + # Must not hold the lock while calling the handlers. + for h in hl: try: h(event, val) except: diff --git a/tools/python/xen/xend/XendDomain.py b/tools/python/xen/xend/XendDomain.py index 23077a55e4..1f3568e21f 100644 --- a/tools/python/xen/xend/XendDomain.py +++ b/tools/python/xen/xend/XendDomain.py @@ -201,6 +201,7 @@ class XendDomain: if domid in doms: try: self._new_domain(config, doms[domid]) + self.update_domain(domid) except Exception, ex: log.exception("Error recreating domain info: id=%s", domid) self._delete_domain(domid) @@ -301,11 +302,11 @@ class XendDomain: destroyed = 0 for d in casualties: id = str(d['dom']) - print 'reap>', id + #print 'reap>', id dominfo = self.domain_by_id.get(id) name = (dominfo and dominfo.name) or '??' if dominfo and dominfo.is_terminated(): - print 'reap> already terminated:', id + #print 'reap> already terminated:', id continue log.debug('XendDomain>reap> domain died name=%s id=%s', name, id) if d['shutdown']: @@ -725,9 +726,9 @@ class XendDomain: @param devconfig: device configuration """ dominfo = self.domain_lookup(id) - self.refresh_schedule() val = dominfo.device_create(devconfig) self.update_domain(dominfo.id) + self.refresh_schedule() return val def domain_device_configure(self, id, devconfig, idx): @@ -739,9 +740,9 @@ class XendDomain: @return: updated device configuration """ dominfo = self.domain_lookup(id) - self.refresh_schedule() val = dominfo.device_configure(devconfig, idx) self.update_domain(dominfo.id) + self.refresh_schedule() return val def domain_device_refresh(self, id, type, idx): @@ -752,9 +753,9 @@ class XendDomain: @param type: device type """ dominfo = self.domain_lookup(id) - self.refresh_schedule() val = dominfo.device_refresh(type, idx) self.update_domain(dominfo.id) + self.refresh_schedule() return val def domain_device_destroy(self, id, type, idx): @@ -765,9 +766,9 @@ class XendDomain: @param type: device type """ dominfo = self.domain_lookup(id) - self.refresh_schedule() val = dominfo.device_destroy(type, idx) self.update_domain(dominfo.id) + self.refresh_schedule() return val def domain_devtype_ls(self, id, type): @@ -778,7 +779,7 @@ class XendDomain: @return: device indexes """ dominfo = self.domain_lookup(id) - return dominfo.get_devices(type) + return dominfo.getDeviceIndexes(type) def domain_devtype_get(self, id, type, idx): """Get a device from a domain. @@ -789,16 +790,16 @@ class XendDomain: @return: device object (or None) """ dominfo = self.domain_lookup(id) - return dominfo.get_device_by_index(type, idx) + return dominfo.getDeviceByIndex(type, idx) def domain_vif_credit_limit(self, id, vif, credit, period): """Limit the vif's transmission rate """ dominfo = self.domain_lookup(id) - try: - return dominfo.limit_vif(vif, credit, period) - except Exception, ex: - raise XendError(str(ex)) + dev = dominfo.getDeviceById('vif', vif) + if not dev: + raise XendError("invalid vif") + return dev.setCreditLimit(credit, period) def domain_vif_ls(self, id): """Get list of virtual network interface (vif) indexes for a domain. diff --git a/tools/python/xen/xend/XendDomainInfo.py b/tools/python/xen/xend/XendDomainInfo.py index d576606890..c5f7ae3d99 100644 --- a/tools/python/xen/xend/XendDomainInfo.py +++ b/tools/python/xen/xend/XendDomainInfo.py @@ -4,7 +4,7 @@ Includes support for domain construction, using open-ended configurations. -Author: Mike Wray +Author: Mike Wray """ @@ -25,16 +25,9 @@ import sxp from XendLogging import log from XendError import VmError from XendRoot import get_component -#import XendConsole; xendConsole = XendConsole.instance() from PrettyPrint import prettyprint -"""The length of domain names that Xen can handle. -The names stored in Xen itself are not used for much, and -xend can handle domain names of any length. -""" -MAX_DOMAIN_NAME = 15 - """Flag for a block device backend domain.""" SIF_BLK_BE_DOMAIN = (1<<4) @@ -279,7 +272,6 @@ class XendDomainInfo: self.channel = None self.controllers = {} - self.devices = {} self.configs = [] @@ -386,10 +378,6 @@ class XendDomainInfo: ctrl = self.getDeviceController(type) return ctrl.getDeviceByIndex(idx) - def getDeviceIndex(self, type, dev): - ctrl = self.getDeviceController(type) - return ctrl.getDeviceIndex(dev) - def getDeviceConfig(self, type, id): ctrl = self.getDeviceController(type) return ctrl.getDeviceConfig(id) @@ -398,6 +386,10 @@ class XendDomainInfo: ctrl = self.getDeviceController(type) return ctrl.getDeviceIds() + def getDeviceIndexes(self, type): + ctrl = self.getDeviceController(type) + return ctrl.getDeviceIndexes() + def getDeviceConfigs(self, type): ctrl = self.getDeviceController(type) return ctrl.getDeviceConfigs() @@ -451,16 +443,19 @@ class XendDomainInfo: return sxpr def sxpr_devices(self): - sxpr = ['devices'] + sxpr = [] for ty in self.getDeviceTypes(): - devs = [ ty ] - devs += self.getDeviceSxprs(ty) - sxpr.append(devs) + devs = self.getDeviceSxprs(ty) + sxpr += devs + if sxpr: + sxpr.insert(0, 'devices') + else: + sxpr = None return sxpr def check_name(self, name): - """Check if a vm name is valid. Valid names start with a non-digit - and contain alphabetic characters, digits, or characters in '_-.:/+'. + """Check if a vm name is valid. Valid names contain alphabetic characters, + digits, or characters in '_-.:/+'. The same name cannot be used for more than one vm at the same time. @param name: name @@ -469,8 +464,6 @@ class XendDomainInfo: if self.recreate: return if name is None or name == '': raise VmError('missing vm name') - if name[0] in string.digits: - raise VmError('invalid vm name') for c in name: if c in string.digits: continue if c in '_-.:/+': continue @@ -585,14 +578,11 @@ class XendDomainInfo: val = None if self.savedinfo is None: return val - devinfo = sxp.child(self.savedinfo, 'devices') - if devinfo is None: - return val - devs = sxp.child(devinfo, type) - if devs is None: + devices = sxp.child(self.savedinfo, 'devices') + if devices is None: return val index = str(index) - for d in sxp.children(devs): + for d in sxp.children(devices, type): dindex = sxp.child_value(d, 'index') if dindex is None: continue if str(dindex) == index: @@ -603,19 +593,6 @@ class XendDomainInfo: def get_device_recreate(self, type, index): return self.get_device_savedinfo(type, index) or self.recreate - def limit_vif(self, vif, credit, period): - """Limit the rate of a virtual interface - @param vif: vif - @param credit: vif credit in bytes - @param period: vif period in uSec - @return: 0 on success - """ - #todo: all wrong - #ctrl = xend.netif_create(self.dom, recreate=self.recreate) - #d = ctrl.limitDevice(vif, credit, period) - #return d - pass - def add_config(self, val): """Add configuration data to a virtual machine. @@ -662,8 +639,6 @@ class XendDomainInfo: if ctrl.isDestroyed(): continue ctrl.destroyController(reboot=reboot) if not reboot: - self.devices = {} - self.device_index = {} self.configs = [] self.ipaddrs = [] @@ -674,11 +649,6 @@ class XendDomainInfo: print "image:" sxp.show(self.image) print - for dl in self.devices: - for dev in dl: - print "device:" - sxp.show(dev) - print for val in self.configs: print "config:" sxp.show(val) @@ -1011,9 +981,9 @@ class XendDomainInfo: at creation time, for example when it uses NFS root. """ - blkif = self.getDeviceController("blkif", error=False) + blkif = self.getDeviceController("vbd", error=False) if not blkif: - blkif = self.createDeviceController("blkif") + blkif = self.createDeviceController("vbd") backend = blkif.getBackend(0) backend.connect(recreate=self.recreate) @@ -1210,19 +1180,19 @@ from server import console controller.addDevControllerClass("console", console.ConsoleController) from server import blkif -controller.addDevControllerClass("blkif", blkif.BlkifController) -add_device_handler("vbd", "blkif") +controller.addDevControllerClass("vbd", blkif.BlkifController) +add_device_handler("vbd", "vbd") from server import netif -controller.addDevControllerClass("netif", netif.NetifController) -add_device_handler("vif", "netif") +controller.addDevControllerClass("vif", netif.NetifController) +add_device_handler("vif", "vif") from server import pciif -controller.addDevControllerClass("pciif", pciif.PciController) -add_device_handler("pci", "pciif") +controller.addDevControllerClass("pci", pciif.PciController) +add_device_handler("pci", "pci") from xen.xend.server import usbif -controller.addDevControllerClass("usbif", usbif.UsbifController) -add_device_handler("usb", "usbif") +controller.addDevControllerClass("usb", usbif.UsbifController) +add_device_handler("usb", "usb") #============================================================================ diff --git a/tools/python/xen/xend/scheduler.py b/tools/python/xen/xend/scheduler.py index f60ebab25b..3ddb527185 100644 --- a/tools/python/xen/xend/scheduler.py +++ b/tools/python/xen/xend/scheduler.py @@ -6,18 +6,20 @@ class Scheduler: self.lock = threading.Lock() self.schedule = {} - def later(self, _delay, _name, _fn, args): + def later(self, _delay, _name, _fn, args, kwargs={}): """Schedule a function to be called later (if not already scheduled). @param _delay: delay in seconds @param _name: schedule name @param _fn: function - @param args: arguments + @param args: arguments (list) + @param kwargs keyword arguments (map) """ try: self.lock.acquire() if self.schedule.get(_name): return - timer = threading.Timer(_delay, _fn, args=args) + runargs = [ _name, _fn, args, kwargs ] + timer = threading.Timer(_delay, self._run, args=runargs) self.schedule[_name] = timer finally: self.lock.release() @@ -28,14 +30,22 @@ class Scheduler: @param name: schedule name to cancel """ + timer = self._remove(name) + if timer: + timer.cancel() + + def _remove(self, name): try: self.lock.acquire() timer = self.schedule.get(name) - if not timer: - return - del self.schedule[name] + if timer: + del self.schedule[name] + return timer finally: self.lock.release() - timer.cancel() + + def _run(self, name, fn, args, kwargs): + self._remove(name) + fn(*args, **kwargs) diff --git a/tools/python/xen/xend/server/SrvBase.py b/tools/python/xen/xend/server/SrvBase.py index 5990733d75..9ad1af2ef3 100644 --- a/tools/python/xen/xend/server/SrvBase.py +++ b/tools/python/xen/xend/server/SrvBase.py @@ -1,185 +1,2 @@ # Copyright (C) 2004 Mike Wray - -import cgi - -import os -import sys -import types -import StringIO - -from twisted.internet import defer -from twisted.internet import reactor -from twisted.protocols import http -from twisted.web import error -from twisted.web import resource -from twisted.web import server -from twisted.python.failure import Failure - -from xen.xend import sxp -from xen.xend import PrettyPrint -from xen.xend.Args import ArgError -from xen.xend.XendError import XendError -from xen.xend.XendLogging import log - -def uri_pathlist(p): - """Split a path into a list. - p path - return list of path elements - """ - l = [] - for x in p.split('/'): - if x == '': continue - l.append(x) - return l - -class SrvBase(resource.Resource): - """Base class for services. - """ - - def parse_form(self, req, method): - """Parse the data for a request, GET using the URL, POST using encoded data. - Posts should use enctype='multipart/form-data' in the
tag, - rather than 'application/x-www-form-urlencoded'. Only 'multipart/form-data' - handles file upload. - - req request - returns a cgi.FieldStorage instance - """ - env = {} - env['REQUEST_METHOD'] = method - if self.query: - env['QUERY_STRING'] = self.query - val = cgi.FieldStorage(fp=req.rfile, headers=req.headers, environ=env) - return val - - def use_sxp(self, req): - """Determine whether to send an SXP response to a request. - Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept. - - req request - returns 1 for SXP, 0 otherwise - """ - ok = 0 - user_agent = req.getHeader('User-Agent') - accept = req.getHeader('Accept') - if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0): - ok = 1 - return ok - - def get_op_method(self, op): - """Get the method for an operation. - For operation 'foo' looks for 'op_foo'. - - op operation name - returns method or None - """ - op_method_name = 'op_' + op - return getattr(self, op_method_name, None) - - def perform(self, req): - """General operation handler for posted operations. - For operation 'foo' looks for a method op_foo and calls - it with op_foo(op, req). Replies with code 500 if op_foo - is not found. - - The method must return a list when req.use_sxp is true - and an HTML string otherwise (or list). - Methods may also return a Deferred (for incomplete processing). - - req request - """ - op = req.args.get('op') - if op is None or len(op) != 1: - req.setResponseCode(http.NOT_ACCEPTABLE, "Invalid request") - return '' - op = op[0] - op_method = self.get_op_method(op) - if op_method is None: - req.setResponseCode(http.NOT_IMPLEMENTED, "Operation not implemented: " + op) - req.setHeader("Content-Type", "text/plain") - req.write("Operation not implemented: " + op) - return '' - else: - return self._perform(op, op_method, req) - - def _perform(self, op, op_method, req): - try: - val = op_method(op, req) - except Exception, err: - self._perform_err(err, op, req) - return '' - - if isinstance(val, defer.Deferred): - val.addCallback(self._perform_cb, op, req, dfr=1) - val.addErrback(self._perform_err, op, req, dfr=1) - return server.NOT_DONE_YET - else: - self._perform_cb(val, op, req, dfr=0) - return '' - - def _perform_cb(self, val, op, req, dfr=0): - """Callback to complete the request. - May be called from a Deferred. - - @param err: the error - @param req: request causing the error - @param dfr: deferred flag - """ - if isinstance(val, error.ErrorPage): - req.write(val.render(req)) - elif self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(val, out=req) - else: - req.write('') - self.print_path(req) - if isinstance(val, types.ListType): - req.write('
')
-                PrettyPrint.prettyprint(val, out=req)
-                req.write('
') - else: - req.write(str(val)) - req.write('') - if dfr: - req.finish() - - def _perform_err(self, err, op, req, dfr=0): - """Error callback to complete a request. - May be called from a Deferred. - - @param err: the error - @param req: request causing the error - @param dfr: deferred flag - """ - if isinstance(err, Failure): - err = err.getErrorMessage() - elif not (isinstance(err, ArgError) or - isinstance(err, sxp.ParseError) or - isinstance(err, XendError)): - if dfr: - return err - else: - raise - log.exception("op=%s: %s", op, str(err)) - if self.use_sxp(req): - req.setHeader("Content-Type", sxp.mime_type) - sxp.show(['xend.err', str(err)], out=req) - else: - req.setHeader("Content-Type", "text/plain") - req.write('Error ') - req.write(': ') - req.write(str(err)) - if dfr: - req.finish() - - - def print_path(self, req): - """Print the path with hyperlinks. - """ - pathlist = [x for x in req.prepath if x != '' ] - s = "/" - req.write('

/') - for x in pathlist: - s += x + "/" - req.write(' %s/' % (s, x)) - req.write("

") +from xen.web.SrvBase import * diff --git a/tools/python/xen/xend/server/SrvDaemon.py b/tools/python/xen/xend/server/SrvDaemon.py index c63abbb9cb..8c93512e5e 100644 --- a/tools/python/xen/xend/server/SrvDaemon.py +++ b/tools/python/xen/xend/server/SrvDaemon.py @@ -17,20 +17,14 @@ import StringIO import traceback import time -from twisted.internet import pollreactor -pollreactor.install() - +#from twisted.internet import pollreactor; pollreactor.install() from twisted.internet import reactor -from twisted.internet import protocol -from twisted.internet import abstract -from twisted.internet import defer from xen.lowlevel import xu from xen.xend import sxp from xen.xend import PrettyPrint -from xen.xend import EventServer -eserver = EventServer.instance() +from xen.xend import EventServer; eserver = EventServer.instance() from xen.xend.XendError import XendError from xen.xend.server import SrvServer from xen.xend import XendRoot @@ -331,16 +325,20 @@ class Daemon: log.info("Xend Daemon started") self.createFactories() self.listenEvent(xroot) - self.listenVirq() self.listenChannels() - SrvServer.create(bridge=1) + serverthread = SrvServer.create(bridge=1) self.daemonize() + print 'running serverthread...' + serverthread.start() + print 'running reactor...' reactor.run() except Exception, ex: print >>sys.stderr, 'Exception starting xend:', ex + if DEBUG: + traceback.print_exc() + log.exception("Exception starting xend") self.exit(1) - def createFactories(self): self.channelF = channel.channelFactory() @@ -350,18 +348,22 @@ class Daemon: return event.listenEvent(self, port, interface) def listenChannels(self): - self.channelF.start() - - def listenVirq(self): def virqReceived(virq): print 'virqReceived>', virq eserver.inject('xend.virq', virq) + self.channelF.setVirqHandler(virqReceived) + self.channelF.start() def exit(self, rc=0): reactor.disconnectAll() self.channelF.stop() - sys.exit(rc) + # Calling sys.exit() raises a SystemExit exception, which only + # kills the current thread. Calling os._exit() makes the whole + # Python process exit immediately. There doesn't seem to be another + # way to exit a Python with running threads. + #sys.exit(rc) + os._exit(rc) def instance(): global inst diff --git a/tools/python/xen/xend/server/SrvDeviceDir.py b/tools/python/xen/xend/server/SrvDeviceDir.py deleted file mode 100644 index 52f428540d..0000000000 --- a/tools/python/xen/xend/server/SrvDeviceDir.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (C) 2004 Mike Wray - -from SrvDir import SrvDir - -class SrvDeviceDir(SrvDir): - """Device directory. - """ - - pass diff --git a/tools/python/xen/xend/server/SrvDir.py b/tools/python/xen/xend/server/SrvDir.py index 712521c7b3..05694c28a9 100644 --- a/tools/python/xen/xend/server/SrvDir.py +++ b/tools/python/xen/xend/server/SrvDir.py @@ -1,111 +1,3 @@ # Copyright (C) 2004 Mike Wray - -from twisted.protocols import http -from twisted.web import error - -from xen.xend import sxp -from xen.xend.XendError import XendError - -from SrvBase import SrvBase - -class SrvError(error.ErrorPage): - - def render(self, request): - val = error.ErrorPage.render(self, request) - request.setResponseCode(self.code, self.brief) - return val - -class SrvConstructor: - """Delayed constructor for sub-servers. - Does not import the sub-server class or create the object until needed. - """ - - def __init__(self, klass): - """Create a constructor. It is assumed that the class - should be imported as 'import klass from klass'. - - klass name of its class - """ - self.klass = klass - self.obj = None - - def getobj(self): - """Get the sub-server object, importing its class and instantiating it if - necessary. - """ - if not self.obj: - exec 'from %s import %s' % (self.klass, self.klass) - klassobj = eval(self.klass) - self.obj = klassobj() - return self.obj - -class SrvDir(SrvBase): - """Base class for directory servlets. - """ - isLeaf = False - - def __init__(self): - SrvBase.__init__(self) - self.table = {} - self.order = [] - - def noChild(self, msg): - return SrvError(http.NOT_FOUND, msg, msg) - - def getChild(self, x, req): - if x == '': return self - try: - val = self.get(x) - except XendError, ex: - return self.noChild(str(ex)) - if val is None: - return self.noChild('Not found ' + str(x)) - else: - return val - - def get(self, x): - val = self.table.get(x) - if val is not None: - val = val.getobj() - return val - - def add(self, x, xclass = None): - if xclass is None: - xclass = 'SrvDir' - self.table[x] = SrvConstructor(xclass) - self.order.append(x) - - def render_GET(self, req): - try: - if self.use_sxp(req): - req.setHeader("Content-type", sxp.mime_type) - self.ls(req, 1) - else: - req.write('') - self.print_path(req) - self.ls(req) - self.form(req) - req.write('') - return '' - except Exception, ex: - self._perform_err(ex, "GET", req) - - def ls(self, req, use_sxp=0): - url = req.prePathURL() - if not url.endswith('/'): - url += '/' - if use_sxp: - req.write('(ls ') - for k in self.order: - req.write(' ' + k) - req.write(')') - else: - req.write('
    ') - for k in self.order: - v = self.get(k) - req.write('
  • %s
  • ' - % (url, k, k)) - req.write('
') - - def form(self, req): - pass +from xen.web.SrvBase import * +from xen.web.SrvDir import * diff --git a/tools/python/xen/xend/server/SrvDomain.py b/tools/python/xen/xend/server/SrvDomain.py index 698da4624c..e916083cd8 100644 --- a/tools/python/xen/xend/server/SrvDomain.py +++ b/tools/python/xen/xend/server/SrvDomain.py @@ -1,6 +1,6 @@ # Copyright (C) 2004 Mike Wray -from twisted.protocols import http +from xen.web import http from xen.xend import sxp from xen.xend import XendDomain diff --git a/tools/python/xen/xend/server/SrvDomainDir.py b/tools/python/xen/xend/server/SrvDomainDir.py index 59c4322777..5bfd102fcc 100644 --- a/tools/python/xen/xend/server/SrvDomainDir.py +++ b/tools/python/xen/xend/server/SrvDomainDir.py @@ -3,9 +3,7 @@ import traceback from StringIO import StringIO -from twisted.protocols import http -from twisted.web import error -from twisted.python.failure import Failure +from xen.web import http from xen.xend import sxp from xen.xend import XendDomain diff --git a/tools/python/xen/xend/server/SrvRoot.py b/tools/python/xen/xend/server/SrvRoot.py index 8d38937b72..d6d6cb0ae4 100644 --- a/tools/python/xen/xend/server/SrvRoot.py +++ b/tools/python/xen/xend/server/SrvRoot.py @@ -17,7 +17,6 @@ class SrvRoot(SrvDir): ('domain', 'SrvDomainDir' ), ('console', 'SrvConsoleDir' ), ('event', 'SrvEventDir' ), - ('device', 'SrvDeviceDir' ), ('vnet', 'SrvVnetDir' ), ] @@ -28,3 +27,7 @@ class SrvRoot(SrvDir): for (name, klass) in self.subdirs: self.get(name) xroot.start() + + def __repr__(self): + return "" %(id(self), self.table.keys()) + diff --git a/tools/python/xen/xend/server/SrvServer.py b/tools/python/xen/xend/server/SrvServer.py index 353d6eed24..43edbe3126 100644 --- a/tools/python/xen/xend/server/SrvServer.py +++ b/tools/python/xen/xend/server/SrvServer.py @@ -25,16 +25,15 @@ # todo Support security settings etc. in the config file. # todo Support command-line args. -from twisted.web import server, static -from twisted.web import resource, script -from twisted.internet import reactor +from threading import Thread + +from xen.web.httpserver import HttpServer from xen.xend import XendRoot xroot = XendRoot.instance() - from xen.xend import Vifctl - from SrvRoot import SrvRoot +from SrvDir import SrvDir def create(port=None, interface=None, bridge=0): if port is None: @@ -43,16 +42,8 @@ def create(port=None, interface=None, bridge=0): interface = xroot.get_xend_address() if bridge: Vifctl.network('start') - root = resource.Resource() - xend = SrvRoot() - root.putChild('xend', xend) - site = server.Site(root) - reactor.listenTCP(port, site, interface=interface) - -def main(port=None, interface=None): - create(port, interface) - reactor.run() - - -if __name__ == '__main__': - main() + root = SrvDir() + root.putChild('xend', SrvRoot()) + server = HttpServer(root=root, interface=interface, port=port) + thread = Thread(name="XendHttpServer", target=server.run) + return thread diff --git a/tools/python/xen/xend/server/SrvUsbif.py b/tools/python/xen/xend/server/SrvUsbif.py index 0062ab531b..5ca73c9652 100644 --- a/tools/python/xen/xend/server/SrvUsbif.py +++ b/tools/python/xen/xend/server/SrvUsbif.py @@ -1,6 +1,6 @@ # Copyright (C) 2004 Mike Wray -from twisted.protocols import http +from xen.web import http from xen.xend import sxp from xen.xend import XendDomain diff --git a/tools/python/xen/xend/server/SrvXendLog.py b/tools/python/xen/xend/server/SrvXendLog.py index 0edb110572..465207915d 100644 --- a/tools/python/xen/xend/server/SrvXendLog.py +++ b/tools/python/xen/xend/server/SrvXendLog.py @@ -1,6 +1,6 @@ # Copyright (C) 2004 Mike Wray -from twisted.web import static +from xen.web import static from xen.xend import XendRoot @@ -21,4 +21,4 @@ class SrvXendLog(SrvDir): try: return self.logfile.render(req) except Exception, ex: - self._perform_err(ex, req) + self._perform_err(ex, 'log', req) diff --git a/tools/python/xen/xend/server/blkif.py b/tools/python/xen/xend/server/blkif.py index f5401f7573..5ba2b2171c 100755 --- a/tools/python/xen/xend/server/blkif.py +++ b/tools/python/xen/xend/server/blkif.py @@ -249,16 +249,13 @@ class BlkDev(Dev): self.configure(self.config, recreate=recreate) def init(self, recreate=False, reboot=False): - print 'BlkDev>init>' self.frontendDomain = self.getDomain() self.frontendChannel = self.getChannel() backend = self.getBackend() self.backendChannel = backend.backendChannel self.backendId = backend.id - print 'BlkDev>init<' def configure(self, config, change=False, recreate=False): - print 'BlkDev>configure>' if change: raise XendError("cannot reconfigure vbd") self.config = config @@ -282,15 +279,12 @@ class BlkDev(Dev): except: raise XendError('invalid backend domain') - print 'BlkDev>configure<' return self.config def attach(self, recreate=False, change=False): - print 'BlkDev>attach>', self if recreate: - print 'attach>', 'recreate=', recreate node = sxp.child_value(recreate, 'node') - print 'attach>', 'node=', node + print 'BlkDev>attach>', 'recreate=', recreate, 'node=', node self.setNode(node) else: node = Blkctl.block('bind', self.type, self.params) @@ -298,7 +292,6 @@ class BlkDev(Dev): self.attachBackend() if change: self.interfaceChanged() - print 'BlkDev>attach<', self def unbind(self): if self.node is None: return @@ -401,13 +394,10 @@ class BlkDev(Dev): """Attach the device to its controller. """ - print 'BlkDev>attachBackend>' self.getBackend().connect() self.send_be_vbd_create() - print 'BlkDev>attachBackend<' def send_be_vbd_create(self): - print 'BlkDev>send_be_vbd_create>' msg = packMsg('blkif_be_vbd_create_t', { 'domid' : self.frontendDomain, 'blkif_handle' : self.backendId, @@ -443,7 +433,6 @@ class BlkifController(DevController): self.rcvr = None def initController(self, recreate=False, reboot=False): - print 'BlkifController>initController>' self.destroyed = False # Add our handlers for incoming requests. self.rcvr = CtrlMsgRcvr(self.getChannel()) @@ -457,7 +446,6 @@ class BlkifController(DevController): if reboot: self.rebootBackends() self.rebootDevices() - print 'BlkifController>initController<' def sxpr(self): val = ['blkif', ['dom', self.getDomain()]] diff --git a/tools/python/xen/xend/server/console.py b/tools/python/xen/xend/server/console.py index bf4d142593..fbf0ff9bb9 100755 --- a/tools/python/xen/xend/server/console.py +++ b/tools/python/xen/xend/server/console.py @@ -2,7 +2,7 @@ import socket -from twisted.internet import reactor, protocol +from xen.web import reactor, protocol from xen.lowlevel import xu @@ -24,14 +24,13 @@ class ConsoleProtocol(protocol.Protocol): self.console = console self.id = id self.addr = None - self.binary = 0 - def connectionMade(self): + def connectionMade(self, addr=None): peer = self.transport.getPeer() - self.addr = (peer.host, peer.port) + self.addr = addr if self.console.connect(self.addr, self): self.transport.write("Cannot connect to console %d on domain %d\n" - % (self.id, self.console.dom)) + % (self.id, self.console.getDomain())) self.loseConnection() return else: @@ -49,6 +48,7 @@ class ConsoleProtocol(protocol.Protocol): return len(data) def connectionLost(self, reason=None): + print 'ConsoleProtocol>connectionLost>', reason log.info("Console disconnected %s %s %s", str(self.id), str(self.addr[0]), str(self.addr[1])) eserver.inject('xend.console.disconnect', @@ -85,7 +85,6 @@ class ConsoleDev(Dev): STATUS_LISTENING = 'listening' def __init__(self, controller, id, config, recreate=False): - print 'Console>' Dev.__init__(self, controller, id, config) self.status = self.STATUS_NEW self.addr = None @@ -108,7 +107,6 @@ class ConsoleDev(Dev): [self.id, self.getDomain(), self.console_port]) def init(self, recreate=False, reboot=False): - print 'Console>init>' self.destroyed = False self.channel = self.getChannel() self.listen() @@ -165,6 +163,7 @@ class ConsoleDev(Dev): def destroy(self, change=False, reboot=False): """Close the console. """ + print 'ConsoleDev>destroy>', self, reboot if reboot: return self.status = self.STATUS_CLOSED @@ -175,7 +174,8 @@ class ConsoleDev(Dev): def listen(self): """Listen for TCP connections to the console port.. """ - if self.closed(): return + if self.closed(): + return if self.listener: pass else: @@ -193,8 +193,10 @@ class ConsoleDev(Dev): returns 0 if ok, negative otherwise """ - if self.closed(): return -1 - if self.connected(): return -1 + if self.closed(): + return -1 + if self.connected(): + return -1 self.addr = addr self.conn = conn self.status = self.STATUS_CONNECTED @@ -204,6 +206,7 @@ class ConsoleDev(Dev): def disconnect(self, conn=None): """Disconnect the TCP connection to the console. """ + print 'ConsoleDev>disconnect>', conn if conn and conn != self.conn: return if self.conn: self.conn.loseConnection() @@ -288,6 +291,7 @@ class ConsoleController(DevController): self.rebootDevices() def destroyController(self, reboot=False): + print 'ConsoleController>destroyController>', self, reboot self.destroyed = True self.destroyDevices(reboot=reboot) self.rcvr.deregisterChannel() @@ -312,4 +316,6 @@ class ConsoleController(DevController): console = self.getDevice(0) if console: console.receiveOutput(msg) + else: + log.warning('no console: domain %d', self.getDomain()) diff --git a/tools/python/xen/xend/server/controller.py b/tools/python/xen/xend/server/controller.py index 43a500f539..9343c8b132 100755 --- a/tools/python/xen/xend/server/controller.py +++ b/tools/python/xen/xend/server/controller.py @@ -83,7 +83,8 @@ class CtrlMsgRcvr: def lostChannel(self): """Called when the channel to the domain is lost. """ - print 'CtrlMsgRcvr>lostChannel>', + if DEBUG: + print 'CtrlMsgRcvr>lostChannel>', self.channel = None def registerChannel(self): @@ -234,14 +235,12 @@ class DevController: raise NotImplementedError() def createDevice(self, config, recreate=False, change=False): - print 'DevController>createDevice>', 'config=', config, 'recreate=', recreate, 'change=', change dev = self.newDevice(self.nextDeviceId(), config, recreate=recreate) dev.init(recreate=recreate) self.addDevice(dev) idx = self.getDeviceIndex(dev) recreate = self.vm.get_device_recreate(self.getType(), idx) dev.attach(recreate=recreate, change=change) - print 'DevController>createDevice<' def configureDevice(self, id, config, change=False): """Reconfigure an existing device. @@ -290,6 +289,9 @@ class DevController: def getDeviceIds(self): return [ dev.getId() for dev in self.device_order ] + def getDeviceIndexes(self): + return range(0, len(self.device_order)) + def getDevices(self): return self.device_order @@ -314,7 +316,6 @@ class DevController: self.device_order.remove(dev) def rebootDevices(self): - print 'DevController>rebootDevices>', self for dev in self.getDevices(): dev.reboot() @@ -405,7 +406,6 @@ class Dev: def reboot(self): """Reconnect device when the domain is rebooted. """ - print 'Dev>reboot>', self self.init(reboot=True) self.attach() diff --git a/tools/python/xen/xend/server/domain.py b/tools/python/xen/xend/server/domain.py deleted file mode 100644 index eb0dbcf48b..0000000000 --- a/tools/python/xen/xend/server/domain.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (C) 2004 Mike Wray - -from xen.xend.XendError import XendError - -import channel -import controller -from messages import * - -class DomainControllerFactory(controller.ControllerFactory): - """Factory for creating domain controllers. - """ - - def createController(self, dom): - """Create a domain controller. - - dom domain - - returns domain controller - """ - return DomainController(self, dom) - -class DomainController(controller.Controller): - """Generic controller for a domain. - Used for domain shutdown. - """ - - """Map shutdown reasons to the message type to use. - """ - reasons = {'poweroff' : 'shutdown_poweroff_t', - 'reboot' : 'shutdown_reboot_t', - 'suspend' : 'shutdown_suspend_t', - 'sysrq' : 'shutdown_sysrq_t' } - - def __init__(self, factory, dom): - controller.Controller.__init__(self, factory, dom) - self.addMethod(CMSG_SHUTDOWN, 0, None) - self.addMethod(CMSG_MEM_REQUEST, 0, None) - self.registerChannel() - - def shutdown(self, reason, key=0): - """Shutdown a domain. - - reason shutdown reason - key sysrq key (only if reason is 'sysrq') - """ - msgtype = self.reasons.get(reason) - if not msgtype: - raise XendError('invalid reason:' + reason) - extra = {} - if reason == 'sysrq': extra['key'] = key - print extra - self.writeRequest(packMsg(msgtype, extra)) - - def mem_target_set(self, target): - """Set domain memory target in pages. - """ - msg = packMsg('mem_request_t', { 'target' : target * (1 << 8)} ) - self.writeRequest(msg) diff --git a/tools/python/xen/xend/server/event.py b/tools/python/xen/xend/server/event.py index 54fbbde9f0..ebd4d6fd51 100644 --- a/tools/python/xen/xend/server/event.py +++ b/tools/python/xen/xend/server/event.py @@ -1,4 +1,7 @@ -from twisted.internet import reactor, protocol, defer +import sys +import StringIO + +from xen.web import reactor, protocol from xen.lowlevel import xu @@ -64,7 +67,10 @@ class EventProtocol(protocol.Protocol): sxp.show(sxpr, out=io) print >> io io.seek(0) - return self.transport.write(io.getvalue()) + if self.transport: + return self.transport.write(io.getvalue()) + else: + return 0 def send_result(self, res): return self.send_reply(['ok', res]) @@ -135,10 +141,10 @@ class EventProtocol(protocol.Protocol): def op_info(self, name, req): val = ['info'] - val += self.daemon.consoles() - val += self.daemon.blkifs() - val += self.daemon.netifs() - val += self.daemon.usbifs() + #val += self.daemon.consoles() + #val += self.daemon.blkifs() + #val += self.daemon.netifs() + #val += self.daemon.usbifs() return val def op_sys_subscribe(self, name, v): @@ -175,7 +181,6 @@ class EventProtocol(protocol.Protocol): import controller controller.DEBUG = (mode == 'on') - class EventFactory(protocol.Factory): """Asynchronous handler for the event server socket. """ @@ -191,8 +196,6 @@ class EventFactory(protocol.Factory): proto.factory = self return proto - def listenEvent(daemon, port, interface): - protocol = EventFactory(daemon) - return reactor.listenTCP(port, protocol, interface=interface) - + factory = EventFactory(daemon) + return reactor.listenTCP(port, factory, interface=interface) diff --git a/tools/python/xen/xend/server/netif.py b/tools/python/xen/xend/server/netif.py index 18819e8282..2122d3b995 100755 --- a/tools/python/xen/xend/server/netif.py +++ b/tools/python/xen/xend/server/netif.py @@ -4,8 +4,6 @@ import random -from twisted.internet import defer - from xen.xend import sxp from xen.xend import Vifctl from xen.xend.XendError import XendError, VmError @@ -420,7 +418,6 @@ class NetifController(DevController): @param id: interface id @param config: device configuration @param recreate: recreate flag (true after xend restart) - @return: deferred """ return NetDev(self, id, config, recreate=recreate) -- 2.30.2